package sinktest

import Source.SourceTest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object RedisSinkTest {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputPath = "E:\\Flinklearn\\Flink\\resources\\data1\\out.txt"
    val inputStream = env.readTextFile(inputPath)

    // 转换成样例类类型
    val dataStream: DataStream[SensorReading] = inputStream.map(
      data => {
        val fields = data.split(",")
        SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
      })

    // 定义一个FlinkJedisConfigBase
    val conf = new FlinkJedisPoolConfig.Builder().setHost("master").setPort(6379).setPassword("1234").build()


    dataStream.addSink(new RedisSink[SensorReading](conf,new MyRedisMapper))


    env.execute()
  }
}

// 定义一个RedisMapper
class MyRedisMapper extends RedisMapper[SensorReading]{
  // 定义保存数据写入redis的命令 HSET 表名 key value
  override def getCommandDescription: RedisCommandDescription =  new RedisCommandDescription(RedisCommand.HSET,"sensor_temp")

  // 将id指定为key
  override def getKeyFromData(t: SensorReading): String = t.id

  // 将温度值指定为value
  override def getValueFromData(t: SensorReading): String = t.temperature.toString
}